﻿// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BatchedJoinBlock.cs
//
//
// A propagator block that groups individual messages of multiple types
// into tuples of arrays of those messages.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Threading.Tasks.Dataflow.Internal;

namespace System.Threading.Tasks.Dataflow
{
	/// <summary>
	/// Provides a dataflow block that batches a specified number of inputs of potentially differing types
	/// provided to one or more of its targets.
	/// </summary>
	/// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
	/// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
	[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
	[DebuggerTypeProxy(typeof(BatchedJoinBlock<,>.DebugView))]
	public sealed class BatchedJoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>>, IDebuggerDisplay
	{
		/// <summary>The size of the batches generated by this BatchedJoin.</summary>
		private readonly int _batchSize;
		/// <summary>State shared among the targets.</summary>
		private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
		/// <summary>The target providing inputs of type T1.</summary>
		private readonly BatchedJoinBlockTarget<T1> _target1;
		/// <summary>The target providing inputs of type T2.</summary>
		private readonly BatchedJoinBlockTarget<T2> _target2;
		/// <summary>The source side.</summary>
		private readonly SourceCore<Tuple<IList<T1>, IList<T2>>> _source;

		/// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
		/// <param name="batchSize">The number of items to group into a batch.</param>
		/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
		public BatchedJoinBlock(Int32 batchSize) :
			this(batchSize, GroupingDataflowBlockOptions.Default)
		{ }

		/// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
		/// <param name="batchSize">The number of items to group into a batch.</param>
		/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
		/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
		/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
		public BatchedJoinBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
		{
			// Validate arguments
			if (batchSize < 1) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_GenericPositive);
			if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
			if (!dataflowBlockOptions.Greedy) throw new ArgumentException(SR.Argument_NonGreedyNotSupported, "dataflowBlockOptions");
			if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded) throw new ArgumentException(SR.Argument_BoundedCapacityNotSupported, "dataflowBlockOptions");
			Contract.EndContractBlock();

			// Store arguments
			_batchSize = batchSize;
			dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();

			// Configure the source
			_source = new SourceCore<Tuple<IList<T1>, IList<T2>>>(
					this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2>)owningSource).CompleteEachTarget());

			// The action to run when a batch should be created.  This is typically called
			// when we have a full batch, but it will also be called when we're done receiving
			// messages, and thus when there may be a few stragglers we need to make a batch out of.
			Action createBatchAction = () =>
			{
				if (_target1.Count > 0 || _target2.Count > 0)
				{
					_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages()));
				}
			};

			// Configure the targets
			_sharedResources = new BatchedJoinBlockTargetSharedResources(
					batchSize, dataflowBlockOptions,
					createBatchAction,
					() =>
					{
						createBatchAction();
						_source.Complete();
					},
					_source.AddException,
					Complete);
			_target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
			_target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);

			// It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
			// In those cases we need to fault the target half to drop its buffered messages and to release its 
			// reservations. This should not create an infinite loop, because all our implementations are designed
			// to handle multiple completion requests and to carry over only one.
#if NET_4_0_ABOVE
			_source.Completion.ContinueWith((completed, state) =>
			{
				var thisBlock = ((BatchedJoinBlock<T1, T2>)state) as IDataflowBlock;
				Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
				thisBlock.Fault(completed.Exception);
			}, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
#else
			Action<Task> continuationAction = completed =>
			{
				var thisBlock = this as IDataflowBlock;
				Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
				thisBlock.Fault(completed.Exception);
			};
			_source.Completion.ContinueWith(continuationAction, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
#endif

			// Handle async cancellation requests by declining on the target
			Common.WireCancellationToComplete(
					dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2>)state).CompleteEachTarget(), this);
#if FEATURE_TRACING
			DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
			if (etwLog.IsEnabled())
			{
				etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
			}
#endif
		}

		/// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2}"/>.</summary>
		public Int32 BatchSize { get { return _batchSize; } }

		/// <summary>Gets a target that may be used to offer messages of the first type.</summary>
		public ITargetBlock<T1> Target1 { get { return _target1; } }

		/// <summary>Gets a target that may be used to offer messages of the second type.</summary>
		public ITargetBlock<T2> Target2 { get { return _target2; } }

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
		[SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
		public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, DataflowLinkOptions linkOptions)
		{
			return _source.LinkTo(target, linkOptions);
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
		[SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
		public Boolean TryReceive(Predicate<Tuple<IList<T1>, IList<T2>>> filter, out Tuple<IList<T1>, IList<T2>> item)
		{
			return _source.TryReceive(filter, out item);
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
		[SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
		public bool TryReceiveAll(out IList<Tuple<IList<T1>, IList<T2>>> items) { return _source.TryReceiveAll(out items); }

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
		public int OutputCount { get { return _source.OutputCount; } }

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
		public Task Completion { get { return _source.Completion; } }

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
		public void Complete()
		{
			Debug.Assert(_target1 != null, "_target1 not initialized");
			Debug.Assert(_target2 != null, "_target2 not initialized");

			_target1.Complete();
			_target2.Complete();
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
		void IDataflowBlock.Fault(Exception exception)
		{
			if (exception == null) throw new ArgumentNullException("exception");
			Contract.EndContractBlock();

			Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
			Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
			Debug.Assert(_source != null, "_source not initialized");

			lock (_sharedResources._incomingLock)
			{
				if (!_sharedResources._decliningPermanently) _source.AddException(exception);
			}
			Complete();
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
		Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage(
				DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, out Boolean messageConsumed)
		{
			return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
		bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage(
				DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
		{
			return _source.ReserveMessage(messageHeader, target);
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
		void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation(
				DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
		{
			_source.ReleaseReservation(messageHeader, target);
		}

		/// <summary>
		/// Invokes Complete on each target
		/// </summary>
		private void CompleteEachTarget()
		{
			_target1.Complete();
			_target2.Complete();
		}

		/// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
		private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
		public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }

		/// <summary>The data to display in the debugger display attribute.</summary>
		[SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
		private object DebuggerDisplayContent
		{
			get
			{
				return string.Format("{0}, BatchSize={1}, OutputCount={2}",
						Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
						BatchSize,
						OutputCountForDebugger);
			}
		}
		/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
		object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }

		/// <summary>Provides a debugger type proxy for the Transform.</summary>
		private sealed class DebugView
		{
			/// <summary>The block being viewed.</summary>
			private readonly BatchedJoinBlock<T1, T2> _batchedJoinBlock;
			/// <summary>The source half of the block being viewed.</summary>
			private readonly SourceCore<Tuple<IList<T1>, IList<T2>>>.DebuggingInformation _sourceDebuggingInformation;

			/// <summary>Initializes the debug view.</summary>
			/// <param name="batchedJoinBlock">The batched join being viewed.</param>
			public DebugView(BatchedJoinBlock<T1, T2> batchedJoinBlock)
			{
				Contract.Requires(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
				_batchedJoinBlock = batchedJoinBlock;
				_sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
			}

			/// <summary>Gets the messages waiting to be received.</summary>
			public IEnumerable<Tuple<IList<T1>, IList<T2>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
			/// <summary>Gets the number of batches created.</summary>
			public long BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }
			/// <summary>Gets the number of items remaining to form a batch.</summary>
			public int RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }

			/// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
			public Int32 BatchSize { get { return _batchedJoinBlock._batchSize; } }
			/// <summary>Gets the first target.</summary>
			public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }
			/// <summary>Gets the second target.</summary>
			public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }

			/// <summary>Gets the task being used for output processing.</summary>
			public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }

			/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
			public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }
			/// <summary>Gets whether the block is completed.</summary>
			public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
			/// <summary>Gets the block's Id.</summary>
			public int Id { get { return Common.GetBlockId(_batchedJoinBlock); } }

			/// <summary>Gets the set of all targets linked from this block.</summary>
			public TargetRegistry<Tuple<IList<T1>, IList<T2>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
			/// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
			public ITargetBlock<Tuple<IList<T1>, IList<T2>>> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
		}
	}

	/// <summary>
	/// Provides a dataflow block that batches a specified number of inputs of potentially differing types
	/// provided to one or more of its targets.
	/// </summary>
	/// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
	/// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
	/// <typeparam name="T3">Specifies the type of data accepted by the block's third target.</typeparam>
	[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
	[DebuggerTypeProxy(typeof(BatchedJoinBlock<,,>.DebugView))]
	[SuppressMessage("Microsoft.Design", "CA1005:AvoidExcessiveParametersOnGenericTypes")]
	public sealed class BatchedJoinBlock<T1, T2, T3> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>, IDebuggerDisplay
	{
		/// <summary>The size of the batches generated by this BatchedJoin.</summary>
		private readonly int _batchSize;
		/// <summary>State shared among the targets.</summary>
		private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
		/// <summary>The target providing inputs of type T1.</summary>
		private readonly BatchedJoinBlockTarget<T1> _target1;
		/// <summary>The target providing inputs of type T2.</summary>
		private readonly BatchedJoinBlockTarget<T2> _target2;
		/// <summary>The target providing inputs of type T3.</summary>
		private readonly BatchedJoinBlockTarget<T3> _target3;
		/// <summary>The source side.</summary>
		private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>> _source;

		/// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
		/// <param name="batchSize">The number of items to group into a batch.</param>
		/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
		public BatchedJoinBlock(Int32 batchSize) :
			this(batchSize, GroupingDataflowBlockOptions.Default)
		{ }

		/// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
		/// <param name="batchSize">The number of items to group into a batch.</param>
		/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
		/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
		/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
		public BatchedJoinBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
		{
			// Validate arguments
			if (batchSize < 1) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_GenericPositive);
			if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
			if (!dataflowBlockOptions.Greedy ||
					dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
			{
				throw new ArgumentException(SR.Argument_NonGreedyNotSupported, "dataflowBlockOptions");
			}
			Contract.EndContractBlock();

			// Store arguments
			_batchSize = batchSize;
			dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();

			// Configure the source
			_source = new SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>(
					this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2, T3>)owningSource).CompleteEachTarget());

			// The action to run when a batch should be created.  This is typically called
			// when we have a full batch, but it will also be called when we're done receiving
			// messages, and thus when there may be a few stragglers we need to make a batch out of.
			Action createBatchAction = () =>
			{
				if (_target1.Count > 0 || _target2.Count > 0 || _target3.Count > 0)
				{
					_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages(), _target3.GetAndEmptyMessages()));
				}
			};

			// Configure the targets
			_sharedResources = new BatchedJoinBlockTargetSharedResources(
					batchSize, dataflowBlockOptions,
					createBatchAction,
					() =>
					{
						createBatchAction();
						_source.Complete();
					},
					_source.AddException,
					Complete);
			_target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
			_target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);
			_target3 = new BatchedJoinBlockTarget<T3>(_sharedResources);

			// It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
			// In those cases we need to fault the target half to drop its buffered messages and to release its 
			// reservations. This should not create an infinite loop, because all our implementations are designed
			// to handle multiple completion requests and to carry over only one.
#if NET_4_0_ABOVE
			_source.Completion.ContinueWith((completed, state) =>
			{
				var thisBlock = ((BatchedJoinBlock<T1, T2, T3>)state) as IDataflowBlock;
				Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
				thisBlock.Fault(completed.Exception);
			}, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
#else
			Action<Task> continuationAction = completed =>
			{
				var thisBlock = this as IDataflowBlock;
				Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
				thisBlock.Fault(completed.Exception);
			};
			_source.Completion.ContinueWith(continuationAction, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
#endif
			// Handle async cancellation requests by declining on the target
			Common.WireCancellationToComplete(
					dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2, T3>)state).CompleteEachTarget(), this);
#if FEATURE_TRACING
			DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
			if (etwLog.IsEnabled())
			{
				etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
			}
#endif
		}

		/// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2,T3}"/>.</summary>
		public Int32 BatchSize { get { return _batchSize; } }

		/// <summary>Gets a target that may be used to offer messages of the first type.</summary>
		public ITargetBlock<T1> Target1 { get { return _target1; } }

		/// <summary>Gets a target that may be used to offer messages of the second type.</summary>
		public ITargetBlock<T2> Target2 { get { return _target2; } }

		/// <summary>Gets a target that may be used to offer messages of the third type.</summary>
		public ITargetBlock<T3> Target3 { get { return _target3; } }

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
		public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, DataflowLinkOptions linkOptions)
		{
			return _source.LinkTo(target, linkOptions);
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
		[SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
		public Boolean TryReceive(Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter, out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
		{
			return _source.TryReceive(filter, out item);
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
		[SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
		public bool TryReceiveAll(out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items) { return _source.TryReceiveAll(out items); }

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
		public int OutputCount { get { return _source.OutputCount; } }

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
		public Task Completion { get { return _source.Completion; } }

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
		public void Complete()
		{
			Debug.Assert(_target1 != null, "_target1 not initialized");
			Debug.Assert(_target2 != null, "_target2 not initialized");
			Debug.Assert(_target3 != null, "_target3 not initialized");

			_target1.Complete();
			_target2.Complete();
			_target3.Complete();
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
		void IDataflowBlock.Fault(Exception exception)
		{
			if (exception == null) throw new ArgumentNullException("exception");
			Contract.EndContractBlock();

			Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
			Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
			Debug.Assert(_source != null, "_source not initialized");

			lock (_sharedResources._incomingLock)
			{
				if (!_sharedResources._decliningPermanently) _source.AddException(exception);
			}
			Complete();
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
		Tuple<IList<T1>, IList<T2>, IList<T3>> ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage(
				DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, out Boolean messageConsumed)
		{
			return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
		bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage(
				DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
		{
			return _source.ReserveMessage(messageHeader, target);
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
		void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation(
				DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
		{
			_source.ReleaseReservation(messageHeader, target);
		}

		/// <summary>
		/// Invokes Complete on each target
		/// </summary>
		private void CompleteEachTarget()
		{
			_target1.Complete();
			_target2.Complete();
			_target3.Complete();
		}

		/// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
		private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
		public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }

		/// <summary>The data to display in the debugger display attribute.</summary>
		[SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
		private object DebuggerDisplayContent
		{
			get
			{
				return string.Format("{0}, BatchSize={1}, OutputCount={2}",
						Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
						BatchSize,
						OutputCountForDebugger);
			}
		}
		/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
		object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }

		/// <summary>Provides a debugger type proxy for the Transform.</summary>
		private sealed class DebugView
		{
			/// <summary>The block being viewed.</summary>
			private readonly BatchedJoinBlock<T1, T2, T3> _batchedJoinBlock;
			/// <summary>The source half of the block being viewed.</summary>
			private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>.DebuggingInformation _sourceDebuggingInformation;

			/// <summary>Initializes the debug view.</summary>
			/// <param name="batchedJoinBlock">The batched join being viewed.</param>
			public DebugView(BatchedJoinBlock<T1, T2, T3> batchedJoinBlock)
			{
				Contract.Requires(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
				_sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
				_batchedJoinBlock = batchedJoinBlock;
			}

			/// <summary>Gets the messages waiting to be received.</summary>
			public IEnumerable<Tuple<IList<T1>, IList<T2>, IList<T3>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
			/// <summary>Gets the number of batches created.</summary>
			public long BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }
			/// <summary>Gets the number of items remaining to form a batch.</summary>
			public int RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }

			/// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
			public Int32 BatchSize { get { return _batchedJoinBlock._batchSize; } }
			/// <summary>Gets the first target.</summary>
			public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }
			/// <summary>Gets the second target.</summary>
			public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }
			/// <summary>Gets the second target.</summary>
			public ITargetBlock<T3> Target3 { get { return _batchedJoinBlock._target3; } }

			/// <summary>Gets the task being used for output processing.</summary>
			public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }

			/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
			public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }
			/// <summary>Gets whether the block is completed.</summary>
			public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
			/// <summary>Gets the block's Id.</summary>
			public int Id { get { return Common.GetBlockId(_batchedJoinBlock); } }

			/// <summary>Gets the set of all targets linked from this block.</summary>
			public TargetRegistry<Tuple<IList<T1>, IList<T2>, IList<T3>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
			/// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
			public ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
		}
	}
}

namespace System.Threading.Tasks.Dataflow.Internal
{
	/// <summary>Provides the target used in a BatchedJoin.</summary>
	/// <typeparam name="T">Specifies the type of data accepted by this target.</typeparam>
	[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
	[DebuggerTypeProxy(typeof(BatchedJoinBlockTarget<>.DebugView))]
	internal sealed class BatchedJoinBlockTarget<T> : ITargetBlock<T>, IDebuggerDisplay
	{
		/// <summary>The shared resources used by all targets associated with the same batched join instance.</summary>
		private readonly BatchedJoinBlockTargetSharedResources _sharedResources;
		/// <summary>Whether this target is declining future messages.</summary>
		private bool _decliningPermanently;
		/// <summary>Input messages for the next batch.</summary>
		private IList<T> _messages = new List<T>();

		/// <summary>Initializes the target.</summary>
		/// <param name="sharedResources">The shared resources used by all targets associated with this batched join.</param>
		internal BatchedJoinBlockTarget(BatchedJoinBlockTargetSharedResources sharedResources)
		{
			Contract.Requires(sharedResources != null, "Targets require a shared resources through which to communicate.");

			// Store the shared resources, and register with it to let it know there's 
			// another target. This is done in a non-thread-safe manner and must be done 
			// during construction of the batched join instance.
			_sharedResources = sharedResources;
			sharedResources._remainingAliveTargets++;
		}

		/// <summary>Gets the number of messages buffered in this target.</summary>
		internal int Count { get { return _messages.Count; } }

		/// <summary>Gets the messages buffered by this target and then empties the collection.</summary>
		/// <returns>The messages from the target.</returns>
		internal IList<T> GetAndEmptyMessages()
		{
			Common.ContractAssertMonitorStatus(_sharedResources._incomingLock, held: true);

			IList<T> toReturn = _messages;
			_messages = new List<T>();
			return toReturn;
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
		public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
		{
			// Validate arguments
			if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
			if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
			Contract.EndContractBlock();

			lock (_sharedResources._incomingLock)
			{
				// If we've already stopped accepting messages, decline permanently
				if (_decliningPermanently ||
						_sharedResources._decliningPermanently)
					return DataflowMessageStatus.DecliningPermanently;

				// Consume the message from the source if necessary, and store the message
				if (consumeToAccept)
				{
					Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

					bool consumed;
					messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
					if (!consumed) return DataflowMessageStatus.NotAvailable;
				}
				_messages.Add(messageValue);

				// If this message makes a batch, notify the shared resources that a batch has been completed
				if (--_sharedResources._remainingItemsInBatch == 0) _sharedResources._batchSizeReachedAction();

				return DataflowMessageStatus.Accepted;
			}
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
		public void Complete()
		{
			lock (_sharedResources._incomingLock)
			{
				// If this is the first time Complete is being called,
				// note that there's now one fewer targets receiving messages for the batched join.
				if (!_decliningPermanently)
				{
					_decliningPermanently = true;
					if (--_sharedResources._remainingAliveTargets == 0) _sharedResources._allTargetsDecliningPermanentlyAction();
				}
			}
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
		void IDataflowBlock.Fault(Exception exception)
		{
			if (exception == null) throw new ArgumentNullException("exception");
			Contract.EndContractBlock();

			lock (_sharedResources._incomingLock)
			{
				if (!_decliningPermanently && !_sharedResources._decliningPermanently) _sharedResources._exceptionAction(exception);
			}

			_sharedResources._completionAction();
		}

		/// <include file='..\XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
		Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }

		/// <summary>The data to display in the debugger display attribute.</summary>
		[SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
		private object DebuggerDisplayContent
		{
			get
			{
				return string.Format("{0} InputCount={1}",
						Common.GetNameForDebugger(this),
						_messages.Count);
			}
		}
		/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
		object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }

		/// <summary>Provides a debugger type proxy for the Transform.</summary>
		private sealed class DebugView
		{
			/// <summary>The batched join block target being viewed.</summary>
			private readonly BatchedJoinBlockTarget<T> _batchedJoinBlockTarget;

			/// <summary>Initializes the debug view.</summary>
			/// <param name="batchedJoinBlockTarget">The batched join target being viewed.</param>
			public DebugView(BatchedJoinBlockTarget<T> batchedJoinBlockTarget)
			{
				Contract.Requires(batchedJoinBlockTarget != null, "Need a block with which to construct the debug view.");
				_batchedJoinBlockTarget = batchedJoinBlockTarget;
			}

			/// <summary>Gets the messages waiting to be processed.</summary>
			public IEnumerable<T> InputQueue { get { return _batchedJoinBlockTarget._messages; } }
			/// <summary>Gets whether the block is declining further messages.</summary>
			public bool IsDecliningPermanently
			{
				get
				{
					return _batchedJoinBlockTarget._decliningPermanently ||
							_batchedJoinBlockTarget._sharedResources._decliningPermanently;
				}
			}
		}
	}

	/// <summary>Provides a container for resources shared across all targets used by the same BatchedJoinBlock instance.</summary>
	internal sealed class BatchedJoinBlockTargetSharedResources
	{
		/// <summary>Initializes the shared resources.</summary>
		/// <param name="batchSize">The size of a batch to create.</param>
		/// <param name="dataflowBlockOptions">The options used to configure the shared resources.  Assumed to be immutable.</param>
		/// <param name="batchSizeReachedAction">The action to invoke when a batch is completed.</param>
		/// <param name="allTargetsDecliningAction">The action to invoke when no more targets are accepting input.</param>
		/// <param name="exceptionAction">The action to invoke when an exception needs to be logged.</param>
		/// <param name="completionAction">The action to invoke when completing, typically invoked due to a call to Fault.</param>
		internal BatchedJoinBlockTargetSharedResources(
				int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions,
				Action batchSizeReachedAction, Action allTargetsDecliningAction,
				Action<Exception> exceptionAction, Action completionAction)
		{
			Debug.Assert(batchSize >= 1, "A positive batch size is required.");
			Debug.Assert(batchSizeReachedAction != null, "Need an action to invoke for each batch.");
			Debug.Assert(allTargetsDecliningAction != null, "Need an action to invoke when all targets have declined.");

			_incomingLock = new object();
			_batchSize = batchSize;

			// _remainingAliveTargets will be incremented when targets are added.
			// They must be added during construction of the BatchedJoin<...>.
			_remainingAliveTargets = 0;
			_remainingItemsInBatch = batchSize;

			// Configure what to do when batches are completed and/or all targets start declining
			_allTargetsDecliningPermanentlyAction = () =>
			{
				// Invoke the caller's action
				allTargetsDecliningAction();

				// Don't accept any more messages.  We should already
				// be doing this anyway through each individual target's declining flag, 
				// so setting it to true is just a precaution and is also helpful
				// when onceOnly is true.
				_decliningPermanently = true;
			};
			_batchSizeReachedAction = () =>
			{
				// Invoke the caller's action
				batchSizeReachedAction();
				_batchesCreated++;

				// If this batched join is meant to be used for only a single
				// batch, invoke the completion logic.
				if (_batchesCreated >= dataflowBlockOptions.ActualMaxNumberOfGroups) _allTargetsDecliningPermanentlyAction();

				// Otherwise, get ready for the next batch.
				else _remainingItemsInBatch = _batchSize;
			};
			_exceptionAction = exceptionAction;
			_completionAction = completionAction;
		}

		/// <summary>
		/// A lock used to synchronize all incoming messages on all targets. It protects all of the rest 
		/// of the shared Resources's state and will be held while invoking the delegates.
		/// </summary>
		internal readonly object _incomingLock;
		/// <summary>The size of the batches to generate.</summary>
		internal readonly int _batchSize;

		/// <summary>The action to invoke when enough elements have been accumulated to make a batch.</summary>
		internal readonly Action _batchSizeReachedAction;
		/// <summary>The action to invoke when all targets are declining further messages.</summary>
		internal readonly Action _allTargetsDecliningPermanentlyAction;
		/// <summary>The action to invoke when an exception has to be logged.</summary>
		internal readonly Action<Exception> _exceptionAction;
		/// <summary>The action to invoke when the owning block has to be completed.</summary>
		internal readonly Action _completionAction;

		/// <summary>The number of items remaining to form a batch.</summary>
		internal int _remainingItemsInBatch;
		/// <summary>The number of targets still alive (i.e. not declining all further messages).</summary>
		internal int _remainingAliveTargets;
		/// <summary>Whether all targets should decline all further messages.</summary>
		internal bool _decliningPermanently;
		/// <summary>The number of batches created.</summary>
		internal long _batchesCreated;
	}
}
